#ifndef __LICH_COROUTINE_H
#define __LICH_COROUTINE_H

#include "sysy_conf.h"
#include "schedule.h"
#include "squeue.h"
#include "ylock.h"
#include "list.h"

/** @file 协程使用模式
 *
 * 当前任务：
 * - schedule_task_get
 * - schedule_task_new [可选]
 * - schedule_yield    [可选]
 *
 * 派生任务：
 * - schedule_resume
 *
 * yield语义：
 * - 区别于多线程编程的关键点
 * - 视当前任务是否需要wait而定，可选
 *
 * 唤醒模式：
 * - 单任务
 * - 多任务（一次性，或队列模式，或其他）
 *
 * 参数传递：
 * - 若当前任务有yield，则可以传递本地stack上数据给新任务；否则，不能传递stack数据。[当前任务执行完成后，本地变量退出作用域]
 * - 若传递heap数据，由两任务协助进行分配和回收。
 *
 * 同步机制：
 * - plock
 * - spinlock etc [慎用]
 *
 * 编程注意事项：
 * - max stack size == 64K
 * - non-block任务
 * - 同步用plock，如同多线程编程一样，需要注意竞态条件和死锁
 */

/**
 * 涉及多卷操作，如需要维护每个卷的上下文信息
 */
typedef struct {
        sy_spinlock_t lock;
        squeue_t queue;
} co_worker_t;

/**
 * 与co_wait_ctx_t:wait_queue配合使用
 */
typedef struct {
        struct list_head hook;
        task_t task;
} co_wait_task_t;

/**
 * 或唤醒单任务，或唤醒一任务队列，或其他唤醒模式
 *
 * 用户定义一上下文结构，包含此union, 作为参数传递给新任务(自身的唤醒者)，
 * 另外包含新任务所需参数。
 */
typedef union {
        // 当前任务，用于单任务唤醒模式
        task_t task;
        // 当前任务进入等待任务队列
        struct list_head wait_queue;
} co_wait_ctx_t;


int co_worker_init(co_worker_t **worker, squeue_cmp_func cmp_func);
int co_worker_destroy(co_worker_t *worker);

// -- general

typedef int (*co_func_t)(void *arg);

// -- 1:1 pattern: co_pair_t

typedef struct {
        task_t task;
        // user-defined
        char name[MAX_NAME_LEN];
        co_func_t func;
        void *arg;
} co_pair_t;

typedef struct {
        //
} co_pair_ctx_t;


static inline void __do_pair(void *arg) {
        int ret;
        co_pair_t *ctx = arg;

        if (ctx->func) {
                ret = ctx->func(ctx->arg);
        } else {
                ret = 0;
        }

        schedule_resume(&ctx->task, ret, NULL);
}

static inline int co_pair_wait(co_pair_t *pair, const char *task_name, co_func_t func, void *arg, int need_free) {
        int ret;

        strcpy(pair->name, task_name);
        pair->func = func;
        pair->arg = arg;

        pair->task = schedule_task_get();
        schedule_task_new(task_name, __do_pair, pair, -1);
        ret = schedule_yield(task_name, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (need_free && arg != NULL)
                yfree((void **)&arg);

        return 0;
err_ret:
        return ret;
}

// -- co_cond_t

typedef struct {
        struct list_head wait_queue;
        int size;
} co_cond_t;

static inline int co_cond_init(co_cond_t *cv) {
        INIT_LIST_HEAD(&cv->wait_queue);
        cv->size = 0;
        return 0;
}

/**
 * used in a loop, to check external condition
 * until last time of co_cond_wait master task can not yield since co_cond_init
 *
 * @param cv
 * @return
 */
static inline int co_cond_wait2(co_cond_t *cv, const char *name) {
        int ret;
        co_wait_task_t wait_task;

        wait_task.task = schedule_task_get();
        list_add_tail(&wait_task.hook, &cv->wait_queue);
        cv->size++;

        // struct timeval t1, t2;
        // _gettimeofday(&t1, NULL);

        ret = schedule_yield(name, NULL, NULL);
        if (unlikely(ret)) {
                DWARN("wait %s fail\n", name);
                GOTO(err_ret, ret);
        }

        // _gettimeofday(&t2, NULL);
        // int64_t used = _time_used(&t1, &t2);
        // DBUG("%s task %u size %d used %ju\n", name, wait_task.task.taskid, cv->size, used);
        return 0;
err_ret:
        return ret;
}

static inline int co_cond_wait(co_cond_t *cv) {
        return co_cond_wait2(cv, "co_cond_wait");
}

static inline int co_cond_broadcast(co_cond_t *cv, int ret) {
        struct list_head *pos, *n;
        co_wait_task_t *wait_task;

        list_for_each_safe(pos, n, &cv->wait_queue) {
                wait_task = (co_wait_task_t *)pos;
                schedule_resume(&wait_task->task, ret, NULL);

                list_del_init(pos);
                cv->size--;
                DBUG("resume task %u size %d\n", wait_task->task.taskid, cv->size);
        }

        return 0;
}

static inline int co_cond_signal(co_cond_t *cv, int ret) {
        struct list_head *pos, *n;
        co_wait_task_t *wait_task;

        list_for_each_safe(pos, n, &cv->wait_queue) {
                wait_task = (co_wait_task_t *)pos;
                schedule_resume(&wait_task->task, ret, NULL);

                list_del_init(pos);
                cv->size--;
                DBUG("resume task %u size %d\n", wait_task->task.taskid, cv->size);
                break;
        }

        return 0;
}

static inline int co_cond_destroy(co_cond_t *cv) {
        co_cond_broadcast(cv, 0);
        YASSERT(cv->size == 0);
        return 0;
}

// -- stream pattern: co_mq_t

typedef struct {
        count_list_t queue;
        char name[MAX_NAME_LEN];
        uint64_t task_count;
        int batch;

        // internals
        co_func_t swf;
        void *swf_arg;
} co_mq_t;

typedef struct {
        struct list_head hook;
        task_t task;
        co_mq_t *mq;

        // user-defined
        char name[MAX_NAME_LEN];
        co_func_t func;
        void *arg;
        int need_free;
} co_mq_ctx_t;

int co_mq_init(co_mq_t *mq, const char *name);
int co_mq_init_batch(co_mq_t *mq, const char *name, int batch);

int co_mq_destroy(co_mq_t *mq);

// new task, but no yield
int co_mq_offer(co_mq_t *mq, const char *task_name, co_func_t func, void *arg, int need_free);

// -- master-slave pattern

// -- fork and join

typedef struct {
        int retval;
        void *ctx;
} co_fork_entry_t;

typedef struct {
        char name[MAX_NAME_LEN];
        task_t task;
        int active;     ///< 活跃任务数
        int capacity;   ///< 总任务数
        co_fork_entry_t entries[0];
} co_fork_t;

/**
 * @note 嵌入ctx中, 必须是第一个元素
 */
typedef struct {
        co_fork_t *fork;      ///< fork是单实例
        int idx;              ///< ctx在entries的索引
} co_fork_hook_t;

/**
 * @note co_fork_create和co_fork_join必须成对出现，中间不应出现跳转，或中断
 *
 * @see co_fork_join
 *
 * @param fork
 * @param name
 * @param capacity
 * @return
 */
static inline int co_fork_create(co_fork_t **fork, const char *name, int capacity) {
        int ret, len;
        co_fork_t *_fork;

        *fork = NULL;

        len = sizeof(co_fork_t) + sizeof(co_fork_entry_t) * capacity;
        ret = ymalloc((void **)&_fork, len);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(_fork, 0x0, len);

        strcpy(_fork->name, name);
        _fork->task = schedule_task_get();
        _fork->capacity = capacity;
        _fork->active = 0;

        *fork = _fork;
        return 0;
err_ret:
        return ret;
}

static inline void co_fork_add(co_fork_t *fork, func_t func, void *ctx) {
        int idx = fork->active;

        YASSERT(idx < fork->capacity);

        DBUG("idx %d ctx %p\n", idx, ctx);

        fork->entries[idx].ctx = ctx;

        // TODO 必须是ctx的第一个元素
        co_fork_hook_t *hook = (co_fork_hook_t *)ctx;
        hook->fork = fork;
        hook->idx = idx;

        // Let's go
        schedule_task_new(fork->name, func, ctx, -1);

        fork->active++;
}

static inline void co_fork_return(co_fork_t *fork, int idx, int ret) {
        YASSERT(fork->active > 0 && idx < fork->capacity);

        fork->entries[idx].retval = ret;
        fork->active--;
        if (fork->active == 0) {
                // TODO 0 or ret
                // 通过各个entry的返回值检查执行结果
                schedule_resume(&fork->task, 0, NULL);
        }
}

static inline int co_fork_join(co_fork_t *fork) {
        int ret, i, success = 0;
        co_fork_entry_t *ent;

        YASSERT(fork->active == fork->capacity);

        ret = schedule_yield(fork->name, NULL, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        YASSERT(fork->active == 0);

        for (i=0; i < fork->capacity; i++) {
                ent = &fork->entries[i];
                if (ent->retval == 0) {
                        success++;
                } else {
                        DWARN("ret %d\n", ent->retval);
                }
        }

        DBUG("capacity %d success %d\n", fork->capacity, success);

        if (unlikely(success != fork->capacity)) {
                DWARN("capacity %d success %d\n", fork->capacity, success);
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

typedef int (*fork_iterate_fun)(void *_ctx, int ret, void *arg);

static inline int co_fork_iterate(co_fork_t *fork, fork_iterate_fun fun, void *arg) {
        int ret;
        co_fork_entry_t *ent;

        for (int i=0; i < fork->capacity; i++) {
                ent = &fork->entries[i];
                ret = fun(ent->ctx, ent->retval, arg);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#endif
